package com.shujia.base;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * 单元测试的注解：
 *
 * @Before 在Test方法执行之前执行
 * @Test 在方法上定义，可以不用main方法，单独运行
 * @After 在Test方法执行之后执行
 * <p>
 * 使用java操作数据库，被称之为jdbc
 * 1、注册驱动
 * 2、创建与数据库的连接对象
 * 3、创建操作数据库的对象
 * 4、执行方法
 * 5、分析结果
 * 6、释放资源
 * hbase基础中要做的需求：
 * 1、如何创建一张表
 * 2、如何删除一张表
 * 3、如何向一张表中添加一条数据
 * 4、如何向一张表中同时添加一批数据
 * 5、如何获取一条数据
 * 6、如果获取一批数据
 * 7、如何创建预分region表
 */
public class HbaseAPI {
    //成员变量
    private Connection conn;
    private Admin admin;

    @Before
    public void connection() {
        try {
            //创建hbase的运行环境对象
            //旧版本的写法，已弃用
//        HBaseConfiguration conf = new HBaseConfiguration();
            //新版本创建配置文件对象的方式
            Configuration conf = HBaseConfiguration.create();

            //设置zookeeper的节点信息
            conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");

            //创建hbase连接对象
            conn = ConnectionFactory.createConnection(conf);

            //创建数据库操作对象
//            HBaseAdmin hBaseAdmin = new HBaseAdmin(conn);

            admin = conn.getAdmin();

            System.out.println("成功获取数据库连接对象：" + conn);
            System.out.println("成功获取数据库操作对象：" + admin);
            System.out.println("=================================================");


        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 如何创建一张表
     * 在hbase中创建一张表的必要因素是表名和列簇
     * create 'test2','info'
     */
    @Test
    public void createOneTable() {
        /**在hbase中，表和列簇需要分开创建并设置，然后再将列簇对象添加到表中
         * 1、创建一个表的描述对象
         */
        //将表名封装成TableName的对象
        TableName name = TableName.valueOf("students");


        //在老版本中创建表描述器对象的方式 在2.0.0之后被弃用，将来会在3.0.0之后删除
        //public HTableDescriptor(final TableName name)
//        HTableDescriptor test2 = new HTableDescriptor(name);
        //新版本：推荐使用TableDescriptorBuilder来创建表描述器对象
        //public static TableDescriptorBuilder newBuilder(final TableName name)
        TableDescriptorBuilder test2 = TableDescriptorBuilder.newBuilder(name);

        try {

            /**
             * 2、创建一个列簇描述器对象
             */
            //旧版本的写法：public HColumnDescriptor(final String familyName) 在2.0.0之后被弃用，将来会在3.0.0之后删除
//        HColumnDescriptor info = new HColumnDescriptor("info");
            //新版本写法： ColumnFamilyDescriptorBuilder.of(String).
            ColumnFamilyDescriptor info = ColumnFamilyDescriptorBuilder.of("info");

            /**
             * 3、将列簇添加到表中
             */
            //旧版本的写法
//        test2.addColumnFamily(info);
            //新版本写法
            test2.setColumnFamily(info);

            /**
             * 判断表是否存在
             */
            if (admin.tableExists(name)) {
                System.out.println(Bytes.toString(name.getName()) + " 表已经存在！");
                return;
            }

            /**
             * 4、admin对象调用方法创建一张表
             */
            //void createTable(TableDescriptor desc)
            //TableDescriptorBuilder:
            //public TableDescriptor build() {
            //    return new ModifyableTableDescriptor(desc);
            //  }
            admin.createTable(test2.build());
            System.out.println(Bytes.toString(test2.build().getTableName().getName()) + "表创建 成功 SUCCEED！");
        } catch (Exception e) {
            System.out.println(Bytes.toString(test2.build().getTableName().getName()) + "表创建 失败！FAILED！");
            e.printStackTrace();
        }
    }

    /**
     * 2、如何删除一张表
     */
    @Test
    public void deleteOneTable() {
        try {
            //将表名封装成一个TableName对象
            TableName name = TableName.valueOf("test2");

            /**
             * 判断表是否存在
             */
            if (!admin.tableExists(name)) {
                System.out.println(Bytes.toString(name.getName()) + " 表不存在！无法删除！");
                return;
            }

            //禁用表
            admin.disableTable(name);

            //删除表
            admin.deleteTable(name);

            System.out.println(Bytes.toString(name.getName()) + "表删除 成功 SUCCEED！");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 3、如何向一张表中添加一列数据
     * 1500100001,施笑槐,22,女,文科六班
     * <p>
     * put 'students','1500100001','info:name','施笑槐'
     * put 'students','1500100001','info:age','22'
     * put 'students','1500100001','info:gender','女'
     * put 'students','1500100001','info:clazz','文科六班'
     */
    @Test
    public void putOneColData() {
        try {
            //将表名封装成一个TableName对象
            TableName name = TableName.valueOf("students");

            /**
             * 判断表是否存在
             */
            if (!admin.tableExists(name)) {
                System.out.println(Bytes.toString(name.getName()) + " 表不存在！无法添加数据！");
                return;
            }

            //获取表实例对象
            Table test2 = conn.getTable(name);

            //将一列数据封装成一个Put对象,传入一个行键，需要将行键变成一个字节数组的格式
            Put put = new Put(Bytes.toBytes("1500100001"));
            //设置列簇列名和列值
            //public Put addColumn(byte [] family, byte [] qualifier, byte [] value)
            //设置的方式1：
//            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes("施笑槐"));
            //设置的方式2：
            // public Put add(Cell cell)
            //KeyValue类是Cell接口的实现类
            //public KeyValue(final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value)
            KeyValue keyValue = new KeyValue(Bytes.toBytes("1500100001"),
                    Bytes.toBytes("info"),
                    Bytes.toBytes("age"),
                    Bytes.toBytes("22"));
            put.add(keyValue);


            //表对象调用put方法将一列或多列数据添加到表中
            //default void put(Put put)
            test2.put(put);

            System.out.println("一列数据添加完毕！");


        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 4、如何向一张表中同时添加一批数据
     */
    @Test
    public void putMoreData() {
        ArrayList<Put> puts = new ArrayList<>();
        try {
            //将表名封装成一个TableName对象
            TableName tableName = TableName.valueOf("students");

            /**
             * 判断表是否存在
             */
            if (!admin.tableExists(tableName)) {
                System.out.println(Bytes.toString(tableName.getName()) + " 表不存在！无法添加数据！");
                return;
            }

            //获取表实例对象
            Table students = conn.getTable(tableName);

            //InputStreamReader
            //FileReader
            //BufferedReader
            //创建字符缓冲输入流对象
            BufferedReader br = new BufferedReader(new FileReader("data/students.txt"));
            String line = null;
            Put put = null;
            while ((line = br.readLine()) != null) {
                //1500100001,施笑槐,22,女,文科六班
                String[] infos = line.split(",");
                byte[] rk = Bytes.toBytes(infos[0]);
                byte[] name = Bytes.toBytes(infos[1]);
                put = new Put(rk);
                put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), name);
                puts.add(put);

                byte[] age = Bytes.toBytes(infos[2]);
                put = new Put(rk);
                put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), age);
                puts.add(put);

                byte[] gender = Bytes.toBytes(infos[3]);
                put = new Put(rk);
                put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("gender"), gender);
                puts.add(put);

                byte[] clazz = Bytes.toBytes(infos[4]);
                put = new Put(rk);
                put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("clazz"), clazz);
                puts.add(put);
            }

            students.put(puts);
            System.out.println(Bytes.toString(tableName.getName()) + " 表所有列数据添加完毕！！");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 5、如何获取一条数据
     * get 'students','1500100001','info:name'
     */
    @Test
    public void getOneData() {
        try {
            //将表名封装成一个TableName对象
            TableName tableName = TableName.valueOf("students");

            /**
             * 判断表是否存在
             */
            if (!admin.tableExists(tableName)) {
                System.out.println(Bytes.toString(tableName.getName()) + " 表不存在！无法添加数据！");
                return;
            }

            //获取表实例对象
            Table students = conn.getTable(tableName);
            //创建一个Get对象
            Get get = new Get(Bytes.toBytes("1500100001"));


            //default Result get(Get get)
            Result result = students.get(get);

            /**
             * Hbase中Result类常用的方法：
             *  getRow() 获取行键的字节数组形式
             *  getValue(byte [] family, byte [] qualifier) 获取某一列值的字节数组形式
             *  listCells() 获取所有列单元格组成的List集合
             */

            //result中封装了许多的单元格，每一个单元格都是一个列
            //第一种获取数据的方式：列单独获取，getValue 获取一列数据的前提是要知道列簇和列名
//            String id = Bytes.toString(result.getRow());
//            String name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
//            String age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")));
//            String gender = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("gender")));
//            String clazz = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("clazz")));
//            System.out.println("学号：" + id + ", 姓名：" + name + ", 年龄：" + age + ", 性别：" + gender + ", 班级：" + clazz);

            //第二种获取数据的方式：listCells()
            List<Cell> cells = result.listCells();
            for (Cell cell : cells) {
                //Hbase中提供了一个专门解析Cell单元格的工具类 CellUtil
                String id = Bytes.toString(CellUtil.cloneRow(cell));
                String cf = Bytes.toString(CellUtil.cloneFamily(cell));
                String colName = Bytes.toString(CellUtil.cloneQualifier(cell));
                String colValue = Bytes.toString(CellUtil.cloneValue(cell));
                System.out.println("行键：" + id + ", 列簇：" + cf + ", 列名：" + colName + ", 列值：" + colValue);
                System.out.println("--------------------------------------------");
            }


        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 6、如果获取一批数据
     * <p>
     * scan 'students'
     * scan 'students',LIMIT=>5
     * scan 'students', STARTROW=>'1500100013', STOPROW=>'1500100021'
     */
    @Test
    public void scanMoreData() {
        try {
            //将表名封装成一个TableName对象
            TableName tableName = TableName.valueOf("students");

            /**
             * 判断表是否存在
             */
            if (!admin.tableExists(tableName)) {
                System.out.println(Bytes.toString(tableName.getName()) + " 表不存在！无法添加数据！");
                return;
            }

            //获取表实例对象
            Table students = conn.getTable(tableName);
            //创建一个Scan对象
            Scan scan = new Scan(); //默认是查询所有行数据
            //可以对scan进行一些设置
//            scan.setLimit(5); //只查询前5行所有列数据
            //旧版本设置开始行行键和结束行键
//            scan.setStartRow(Bytes.toBytes("1500100013"));
//            scan.setStopRow(Bytes.toBytes("1500100021"));
            //新版本设置开始行行键和结束行键
            scan.withStartRow(Bytes.toBytes("1500100013"));
            //新版本中可以设置是否包含边界行
            scan.withStopRow(Bytes.toBytes("1500100021"), true);


            ResultScanner resultScanner = students.getScanner(scan);
            //获取一个迭代器对象，存放的是所有行的数据
            Iterator<Result> resultIterator = resultScanner.iterator();
            StringBuilder sb = null;
            while (resultIterator.hasNext()) {
                Result result = resultIterator.next();
                String id = Bytes.toString(result.getRow());
//                String name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
//                String age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")));
//                String gender = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("gender")));
//                String clazz = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("clazz")));
//                System.out.println("学号：" + id + ", 姓名：" + name + ", 年龄：" + age + ", 性别：" + gender + ", 班级：" + clazz);

                List<Cell> cells = result.listCells();
                sb = new StringBuilder();
                sb.append("id：").append(id).append(", ");
                for (int i = 0; i < cells.size(); i++) {
                    //Hbase中提供了一个专门解析Cell单元格的工具类 CellUtil
                    String colName = Bytes.toString(CellUtil.cloneQualifier(cells.get(i)));
                    String colValue = Bytes.toString(CellUtil.cloneValue(cells.get(i)));
                    if (i != cells.size() - 1) {
                        sb.append(colName).append("：").append(colValue).append(", ");
                    } else {
                        sb.append(colName).append("：").append(colValue);
                    }
                }
                System.out.println(sb);
                System.out.println("--------------------------------------------");
            }


        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 7、如何创建预分region表
     */
    @Test
    public void createSplitTable(){
        TableName name = TableName.valueOf("tb_split2");

        TableDescriptorBuilder test2 = TableDescriptorBuilder.newBuilder(name);

        try {

            ColumnFamilyDescriptor info = ColumnFamilyDescriptorBuilder.of("info");

            test2.setColumnFamily(info);

            if (admin.tableExists(name)) {
                System.out.println(Bytes.toString(name.getName()) + " 表已经存在！");
                return;
            }


            byte[][] splitKeys = {
                    Bytes.toBytes("e"),
                    Bytes.toBytes("h"),
                    Bytes.toBytes("l"),
                    Bytes.toBytes("r")
            };

            //createTable(TableDescriptor desc, byte[][] splitKeys)
            //指定分割键创建预分region表 'e','h','l','r'

            admin.createTable(test2.build(),splitKeys);
            System.out.println(Bytes.toString(test2.build().getTableName().getName()) + "表创建 成功 SUCCEED！");
        } catch (Exception e) {
            System.out.println(Bytes.toString(test2.build().getTableName().getName()) + "表创建 失败！FAILED！");
            e.printStackTrace();
        }
    }


    @After
    public void close() {
        if (admin != null) {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        if (conn != null) {
            try {
                conn.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}
